Merge pull request #103 from cantino/refactor_twitter_stream

Cleanup TwitterStreamAgent and add specs

Andrew Cantino 10 years ago
parent
commit
39ffa2fcee

+ 2 - 3
app/concerns/twitter_concern.rb

@@ -19,12 +19,11 @@ module TwitterConcern
19 19
     Twitter.configure do |config|
20 20
       config.consumer_key = options[:consumer_key]
21 21
       config.consumer_secret = options[:consumer_secret]
22
-      config.oauth_token = options[:oauth_token]
23
-      config.oauth_token_secret = options[:oauth_token_secret]
22
+      config.oauth_token = options[:oauth_token] || options[:access_key]
23
+      config.oauth_token_secret = options[:oauth_token_secret] || options[:access_secret]
24 24
     end
25 25
   end
26 26
 
27 27
   module ClassMethods
28
-
29 28
   end
30 29
 end

+ 46 - 21
app/models/agents/twitter_stream_agent.rb

@@ -1,13 +1,16 @@
1 1
 module Agents
2 2
   class TwitterStreamAgent < Agent
3
+    include TwitterConcern
3 4
     cannot_receive_events!
4 5
 
5 6
     description <<-MD
6 7
       The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
7 8
 
8
-      You must provide an oAuth `consumer_key`, `consumer_secret`, `access_key`, and `access_secret`, as well as an array of `filters`.  Multiple words in a filter
9
+      You must provide an oAuth `consumer_key`, `consumer_secret`, `oauth_token`, and `oauth_token_secret`, as well as an array of `filters`.  Multiple words in a filter
9 10
       must all show up in a tweet, but are independent of order.
10 11
 
12
+      If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
13
+
11 14
       To get oAuth credentials for Twitter, [follow these instructions](https://github.com/cantino/huginn/wiki/Getting-a-twitter-oauth-token).
12 15
 
13 16
       Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
@@ -51,14 +54,10 @@ module Agents
51 54
     default_schedule "11pm"
52 55
 
53 56
     def validate_options
54
-      unless options[:consumer_key].present? &&
55
-             options[:consumer_secret].present? &&
56
-             options[:access_key].present? &&
57
-             options[:access_secret].present? &&
58
-             options[:filters].present? &&
57
+      unless options[:filters].present? &&
59 58
              options[:expected_update_period_in_days].present? &&
60 59
              options[:generate].present?
61
-        errors.add(:base, "expected_update_period_in_days, generate, consumer_key, consumer_secret, access_key, access_secret, and filters are required fields")
60
+        errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
62 61
       end
63 62
     end
64 63
 
@@ -70,8 +69,8 @@ module Agents
70 69
       {
71 70
           :consumer_key => "---",
72 71
           :consumer_secret => "---",
73
-          :access_key => "---",
74
-          :access_secret => "---",
72
+          :oauth_token => "---",
73
+          :oauth_token_secret => "---",
75 74
           :filters => %w[keyword1 keyword2],
76 75
           :expected_update_period_in_days => "2",
77 76
           :generate => "events"
@@ -79,25 +78,51 @@ module Agents
79 78
     end
80 79
 
81 80
     def process_tweet(filter, status)
82
-      if options[:generate] == "counts"
83
-        # Avoid memory pollution
84
-        me = Agent.find(id)
85
-        me.memory[:filter_counts] ||= {}
86
-        me.memory[:filter_counts][filter.to_sym] ||= 0
87
-        me.memory[:filter_counts][filter.to_sym] += 1
88
-        me.save!
89
-      else
90
-        create_event :payload => status.merge(:filter => filter.to_s)
81
+      filter = lookup_filter(filter)
82
+
83
+      if filter
84
+        if options[:generate] == "counts"
85
+          # Avoid memory pollution by reloading the Agent.
86
+          agent = Agent.find(id)
87
+          agent.memory[:filter_counts] ||= {}
88
+          agent.memory[:filter_counts][filter.to_sym] ||= 0
89
+          agent.memory[:filter_counts][filter.to_sym] += 1
90
+          remove_unused_keys!(agent, :filter_counts)
91
+          agent.save!
92
+        else
93
+          create_event :payload => status.merge(:filter => filter.to_s)
94
+        end
91 95
       end
92 96
     end
93 97
 
94 98
     def check
95
-      if memory[:filter_counts] && memory[:filter_counts].length > 0
99
+      if options[:generate] == "counts" && memory[:filter_counts] && memory[:filter_counts].length > 0
96 100
         memory[:filter_counts].each do |filter, count|
97 101
           create_event :payload => { :filter => filter.to_s, :count => count, :time => Time.now.to_i }
98 102
         end
99
-        memory[:filter_counts] = {}
100
-        save!
103
+      end
104
+      memory[:filter_counts] = {}
105
+    end
106
+
107
+    protected
108
+
109
+    def lookup_filter(filter)
110
+      options[:filters].each do |known_filter|
111
+        if known_filter == filter
112
+          return filter
113
+        elsif known_filter.is_a?(Array)
114
+          if known_filter.include?(filter)
115
+            return known_filter.first
116
+          end
117
+        end
118
+      end
119
+    end
120
+
121
+    def remove_unused_keys!(agent, base)
122
+      if agent.memory[base]
123
+        (agent.memory[base].keys - agent.options[:filters].map {|f| f.is_a?(Array) ? f.first.to_sym : f.to_sym }).each do |removed_key|
124
+          agent.memory[base].delete(removed_key)
125
+        end
101 126
       end
102 127
     end
103 128
   end

+ 6 - 6
bin/twitter_stream.rb

@@ -17,11 +17,11 @@ require 'pp'
17 17
 def stream!(filters, options = {}, &block)
18 18
   stream = Twitter::JSONStream.connect(
19 19
     :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
20
-    :oauth => {
20
+    :oauth   => {
21 21
       :consumer_key    => options[:consumer_key],
22 22
       :consumer_secret => options[:consumer_secret],
23
-      :access_key      => options[:access_key],
24
-      :access_secret   => options[:access_secret]
23
+      :access_key      => options[:oauth_token] || options[:access_key],
24
+      :access_secret   => options[:oauth_token_secret] || options[:access_secret]
25 25
     },
26 26
     :ssl     => true
27 27
   )
@@ -52,15 +52,15 @@ end
52 52
 
53 53
 def load_and_run(agents)
54 54
   agents.group_by { |agent| agent.options[:twitter_username] }.each do |twitter_username, agents|
55
-    filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.inject({}) { |m, f| m[f] = []; m }
55
+    filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
56 56
 
57 57
     agents.each do |agent|
58
-      agent.options[:filters].uniq.map(&:strip).each do |filter|
58
+      agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
59 59
         filter_to_agent_map[filter] << agent
60 60
       end
61 61
     end
62 62
 
63
-    options = agents.first.options.slice(:consumer_key, :consumer_secret, :access_key, :access_secret)
63
+    options = agents.first.options.slice(:consumer_key, :consumer_secret, :access_key, :oauth_token, :access_secret, :oauth_token_secret)
64 64
 
65 65
     recent_tweets = []
66 66
 

+ 127 - 0
spec/models/agents/twitter_stream_agent_spec.rb

@@ -0,0 +1,127 @@
1
+require 'spec_helper'
2
+
3
+describe Agents::TwitterStreamAgent do
4
+  before do
5
+    @opts = {
6
+      :consumer_key => "---",
7
+      :consumer_secret => "---",
8
+      :oauth_token => "---",
9
+      :oauth_token_secret => "---",
10
+      :filters => %w[keyword1 keyword2],
11
+      :expected_update_period_in_days => "2",
12
+      :generate => "events"
13
+    }
14
+
15
+    @agent = Agents::TwitterStreamAgent.new(:name => "HuginnBot", :options => @opts)
16
+    @agent.user = users(:bob)
17
+    @agent.save!
18
+  end
19
+
20
+  describe '#process_tweet' do
21
+    context "when generate is set to 'counts'" do
22
+      before do
23
+        @agent.options[:generate] = 'counts'
24
+      end
25
+
26
+      it 'records counts' do
27
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
28
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
29
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
30
+
31
+        @agent.reload
32
+        @agent.memory[:filter_counts][:keyword1].should == 2
33
+        @agent.memory[:filter_counts][:keyword2].should == 1
34
+      end
35
+
36
+      it 'records counts for keyword sets as well' do
37
+        @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
38
+        @agent.save!
39
+
40
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
41
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
42
+        @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
43
+        @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
44
+        @agent.process_tweet('keyword1-3', {:text => "something", :user => {:name => "Mr. Someone"}})
45
+        @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
46
+
47
+        @agent.reload
48
+        @agent.memory[:filter_counts][:'keyword1-1'].should == 4 # it stores on the first keyword
49
+        @agent.memory[:filter_counts][:keyword2].should == 2
50
+      end
51
+
52
+      it 'removes unused keys' do
53
+        @agent.memory[:filter_counts] = {:keyword1 => 2, :keyword2 => 3, :keyword3 => 4}
54
+        @agent.save!
55
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
56
+        @agent.reload.memory[:filter_counts].should == {:keyword1 => 3, :keyword2 => 3}
57
+      end
58
+    end
59
+
60
+    context "when generate is set to 'events'" do
61
+      it 'emits events immediately' do
62
+        lambda {
63
+          @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
64
+        }.should change { @agent.events.count }.by(1)
65
+
66
+        @agent.events.last.payload.should == {
67
+          :filter => 'keyword1',
68
+          :text => "something",
69
+          :user => {:name => "Mr. Someone"}
70
+        }
71
+      end
72
+
73
+      it 'handles keyword sets too' do
74
+        @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
75
+        @agent.save!
76
+
77
+        lambda {
78
+          @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
79
+        }.should change { @agent.events.count }.by(1)
80
+
81
+        @agent.events.last.payload.should == {
82
+          :filter => 'keyword1-1',
83
+          :text => "something",
84
+          :user => {:name => "Mr. Someone"}
85
+        }
86
+      end
87
+    end
88
+  end
89
+
90
+  describe '#check' do
91
+    context "when generate is set to 'counts'" do
92
+      before do
93
+        @agent.options[:generate] = 'counts'
94
+        @agent.save!
95
+      end
96
+
97
+      it 'emits events' do
98
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
99
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
100
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
101
+
102
+        lambda {
103
+          @agent.reload.check
104
+        }.should change { @agent.events.count }.by(2)
105
+
106
+        @agent.events[-1].payload[:filter].should == 'keyword1'
107
+        @agent.events[-1].payload[:count].should == 2
108
+
109
+        @agent.events[-2].payload[:filter].should == 'keyword2'
110
+        @agent.events[-2].payload[:count].should == 1
111
+
112
+        @agent.memory[:filter_counts].should == {}
113
+      end
114
+    end
115
+
116
+    context "when generate is not set to 'counts'" do
117
+      it 'does nothing' do
118
+        @agent.memory[:filter_counts] = { :keyword1 => 2 }
119
+        @agent.save!
120
+        lambda {
121
+          @agent.reload.check
122
+        }.should_not change { Event.count }
123
+        @agent.memory[:filter_counts].should == {}
124
+      end
125
+    end
126
+  end
127
+end